iT邦幫忙

2025 iThome 鐵人賽

DAY 9
0

這篇我們總於要來看一下怎麼實際用 Effect 來實作一些功能了,第一篇是資料清理,我們要來看怎麼使用 Effect 協助我們清理資料,並安全的處理各種錯誤的情況

所有的實戰分享都是我有實際使用過 Effect 的案例,雖然大部份的案例沒辦法讓各位看到原本的實際的程式碼,不過我會盡可能的讓案例符合現實的情況

故事開始

今天你拿到了一個老舊的系統,這個老舊系統大部份的資料並沒有使用資料庫,前人將資料使用一堆 json 檔存了起來,今天你的目標是將這些資料想辦法存到資料庫中,但存進去前,你碰到了幾個問題

  1. 存在檔案裡面的資料,有些是老舊的格式,也還有些是異常的資料
  2. 有些資料同時存在資料庫跟檔案中

以上的情況經過一點改編,我知道這個系統設計聽起來很不可思議

好消息是前人還是留下了一些東西

  • 各個資料的 TypeScript 的 type 的定義,雖然不完全可信,因為存在著舊格式的資料還有異常的資料
  • 各種操作資料的 helper function
  • 資料都有一個 uuid 當作 id

在進行這項工程時,我們訂下了幾個準則:

  1. 保證資料牽移的幂等性,也就是多次執行同樣的動作,結果會是一樣的
  2. 能救回來的舊資料就盡量救,不能的才放棄
  3. 留下備份

關於第 1 點,因為牽扯到之後要講的資料牽移,我們等到「15. Effect 實戰分享 3: 資料遷移」才會再來聊到,這次一起來看第 2 點我們是怎麼實作的吧

定義什麼是正常的資料

在一開始,我們需要先知道哪些資料是正常的,因此做為第一步,我們把現有的 TypeScript 的 type 都轉換成了 zod 的 schema

雖然 Effect 也有 schema 的實作,不過大部份情況下,我還是偏好 zod 的實作,之後也可以聊聊我什麼時候會用 zod ,什麼時候會用 Effect 的

例如原本的 type 長的像這樣

interface Item {
  id: string
  field1: string
  field2: string
  // 有些欄位是後來加入的,它有預設值,但儲存資料裡面可能不一定有
  newButRequiredField: string
  // 有些欄位早就已經沒有用到了
  obsoleteField: string
}

那我們把這個 type 轉換成 zod 的 schema 吧

const ItemSchema = z.object({
  id: z.string(),
  field1: z.string(),
  field2: z.string(),
  newButRequiredField: z.string(),
  obsoleteField: z.string(),
})

接著再用 Effect 將整個 parse 的過程包起來

import { Array, Effect, Either } from 'effect'

function parseData(input: unknown) {
  return Effect.try({
    try: () => ItemSchema.parse(input),
    catch: (error) =>
      // 這邊我們可以用上一篇建立的 ValidationError
      new ValidationError({
        message: 'parse error',
        data: input,
        cause: error,
      }),
  })
}

const [errors, validData] = pipe(
  // 載入的是一個陣列資料
  loadData(),
  Effect.flatMap((data) =>
    pipe(
      data.map((item) => parseData(item)),
      // 還記得之前提過的 either mode 嗎?
      Effect.allWith({ mode: 'either' }),
    ),
  ),
  Effect.map((parsed) =>
    pipe(
      parsed,
      // 這邊使用了 Effect 提供的 Array 的 helper ,把資料分成成功 parse 的跟失敗的
      Array.partitionMap((result, index) =>
        // 將 Either 的 Left ,也就是錯誤的資料加上 `index` 方便我們之後去找這這些資料
        Either.mapLeft(result, (error) => ({ error, index })),
      ),
    ),
  ),
  Effect.runSync,
)

到這邊你可能會覺得:「那這跟我自己寫個迴圈處理有什麼差別?」,確實目前還可以寫成

const errors = []
const validData = []
const raw = loadData()
for (const [index, item] of raw.entries()) {
  const parsed = ItemSchema.safeParse(item)
  if (parsed.success) {
    validData.push(parsed.data)
  } else {
    errors.push({
      index,
      error: new ValidationError({
        message: 'parse error',
        data: item,
        cause: parsed.error 
      })
    })
  }
}

下面的版本可讀性感覺還比較好,但這還沒結束,讓我們先繼續下去。我們可以先把錯誤的部份印出來,使用 zod.fyi 可以讓錯誤訊息更好閱讀,那時我還自己寫了個小工具讓我不用一個一個的複製貼上

處理異常的資料

收集到各種的問題後,我們開始著手處理異常的資料,異常的資料大致有分成

  1. 必要的欄位缺少資料
  2. 非必要的欄位資料異常
  3. 整筆資料完全異常,例如那筆資料是 { error: 'Internal server error' }

到這邊,基本上就是不斷的調整 schema ,盡可能的拯救資料,例如為欄位加上預設值,直接放棄非必要的欄位

const ItemSchema = z.object({
  id: z.string(),
  field1: z.string(),
  field2: z.string(),
  // 加上預設值
  newButRequiredField: z.string().default('default value'),
  // 對照 code 後移除非必要的欄位
})

處理資料庫中可能也有的資料

這個系統在演進的過程中,在某次更新加入了資料庫,造成在上面的資料中,有部份已經被存一份到資料庫中了,有在資料庫中的資料,在紀錄時會多一個資料庫中的 id 欄位叫 serverId ,同樣也是 uuid

const ItemSchema = z.object({
  id: z.string(),
  serverId: z.string().optional(),
  field1: z.string(),
  field2: z.string(),
  newButRequiredField: z.string().default('default value'),
})

我們的目前是將資料轉移到資料庫中,對於已經在資料庫中有的資料就不用繼續處理了,於是我們這邊多了一個檢查是否已經存在於資料庫中的非同步的 function checkAvailableInDb ,接下來我們重新設計一下流程

  1. 所有正常的資料都要檢查是否已經存在於資料庫中
  2. 檢查時要控制一下 concurrency ,最多同時檢查 3 個

重新設計後的流程長的像這樣

const [errors, validData] = await pipe(
  loadData(),
  Effect.flatMap((data) =>
    pipe(
      data.map((item) => parseData(item)),
      Effect.allWith({ mode: 'either' }),
    ),
  ),
  Effect.map((parsed) =>
    pipe(
      parsed,
      Array.partitionMap((result, index) =>
        Either.mapLeft(result, (error) => ({ error, index })),
      ),
    ),
  ),
  Effect.flatMap(([errors, validData]) =>
    Effect.all([
      Effect.succeed(errors),
      Effect.filter(
        validData,
        (item) =>
          // 檢查是否在 db 中
          item.serverId
            ? pipe(
                checkAvailableInDb(item.serverId),
                Effect.map((available) => !available),
              )
            : Effect.succeed(false),
        // 控制最多同時執行 3 個
        { concurrency: 3 },
      ),
    ]),
  ),
  // 中間多了 async 的動作,換成使用 runPromise
  Effect.runPromise,
)

你可能會想到為什麼檢查是否存在沒辦法一次進行大量查詢,這牽涉到這個老舊系統當初的設計,如果可以一次處理,整體流程可以變的更簡單

我們稍微整理一下整個流程,在這邊你會發現, FP 的一個好處是每個操作都是各別的函式,要拆開來只需要把相關的 code 剪下貼上到一個 function 中,就可以很好的分解了,我們直接看以下的範例

interface ParseErrorResult {
  index: number
  error: ValidationError
}

type ParseResult = [ParseErrorResult[], Item[]]

function parseDatum(data: unknown[]): Effect.Effect<ParseResult> {
  return pipe(
    data.map((item) => parseData(item)),
    Effect.allWith({ mode: 'either' }),
    Effect.map((parsed) =>
      Array.partitionMap(parsed, (result, index) =>
        Either.mapLeft(result, (error) => ({ error, index })),
      ),
    ),
  )
}

function checkParseResultInDb([errors, validData]: ParseResult): Effect.Effect<ParseResult> {
  return Effect.all([
    Effect.succeed(errors),
    Effect.filter(
      validData,
      (item) =>
        item.serverId
          ? pipe(
              checkAvailableInDb(item.serverId),
              Effect.map((available) => !available),
            )
          : Effect.succeed(false),
      { concurrency: 3 },
    ),
  ])
}

const [errors, validData] = await pipe(
  loadData(),
  Effect.flatMap(parseDatum),
  Effect.flatMap(checkParseResultInDb),
  Effect.runPromise,
)

這樣是否好讀多了呢,我們來看一下如果用 promise 會變的怎麼樣

function parseWithoutEffect(): ParseResult {
  const errors = []
  const validData = []
  const raw = loadData()
  for (const [index, item] of raw.entries()) {
    const parsed = ItemSchema.safeParse(item)
    if (parsed.success) {
      validData.push(parsed.data)
    } else {
      errors.push({
        index,
        error: new ValidationError({
          message: 'parse error',
          data: item,
          cause: parsed.error,
        }),
      })
    }
  }
  
  return [errors, validData]
}


function checkAvailableWithPromise(validData: Item[]): Promise<Item[]> {
  // 跟之前一樣,使用 p-map 幫忙處理 concurrency 限制
  return pMap(validData, async (item) => {
    if (!item.serverId) {
      return pMapSkip
    }
    if (!await checkAvailableInDb(item.serverId)) {
      return pMapSkip
    }
    return item
  }, {concurrency: 3})
}

const [errors, validDataWithoutAvailableCheck] = parseWithoutEffect()
const validData = await checkAvailableWithPromise(validDataWithoutAvailableCheck)

目前以我自己看下來, Effect 的版本可讀性看起來跟直接寫的版本差不多了,但 Effect 的版本還是有些好處存在,例如

  1. 更容易測試,在之後的「13. 如何測試 Effect 的程式」我們再來看如何測試
  2. 更容易抽取部份的 pipeline 來重覆使用

下一篇要來介紹 Effect 最強大的功能,排程與錯誤重試


上一篇
7. 用 Effect 打造自訂的錯誤型別
下一篇
9. Effect 的超級魔法:排程與錯誤重試
系列文
Effect 魔法:打造堅不可摧的應用程式12
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言